Source code for hysop.operator.adapt_timestep

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Update time-step, depending on the flow field.
See :ref:`adaptive time_step` for details.
"""
import numpy as np

from abc import ABCMeta, abstractmethod
from hysop.constants import HYSOP_REAL, StretchingCriteria, AdvectionCriteria
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.decorators import debug
from hysop.core.graph.node_generator import ComputationalGraphNodeGenerator
from hysop.core.graph.computational_operator import ComputationalGraphOperator
from hysop.core.graph.graph import op_apply
from hysop.fields.continuous_field import Field
from hysop.parameters import ScalarParameter, TensorParameter
from hysop.parameters.parameter import Parameter
from hysop.core.mpi import MPI
from hysop.backend.host.host_operator import HostOperatorBase


[docs] class TimestepCriteria(HostOperatorBase, metaclass=ABCMeta): @debug def __new__( cls, parameter, input_params, output_params, dt_coeff=None, min_dt=None, max_dt=None, **kwds, ): return super().__new__( cls, input_params=input_params, output_params=output_params, **kwds ) @debug def __init__( self, parameter, input_params, output_params, dt_coeff=None, min_dt=None, max_dt=None, **kwds, ): """ Initialize an AdaptiveTimeStep operator. Parameters ---------- parameter: ScalarParameter Timestep parameter that will be updated. input_params: set Input parameters used to compute criteria. output_params: set Output parameters used to compute criteria. min_dt : float, optional Minimum value allowed for time step, defaults to 0. max_dt : float, optional Maximum value allowed for time step, defaults to 1e8. dt_coeff: float, optional Constant coefficient applied to resulting dt before resolving min and max values. Defaults to 1. kwds: dict Base class arguments. """ check_instance(parameter, ScalarParameter) check_instance(input_params, set, values=Parameter) check_instance(output_params, set, values=Parameter) assert (min_dt is None) or (min_dt > 0.0) assert (max_dt is None) or (max_dt > 0.0) assert (min_dt is None) or (max_dt is None) or (max_dt >= min_dt) assert (dt_coeff is None) or (dt_coeff > 0.0) assert parameter in output_params super().__init__(input_params=input_params, output_params=output_params, **kwds) self.min_dt = 0.0 if (min_dt is None) else min_dt self.max_dt = 1e8 if (max_dt is None) else max_dt self.dt_coeff = 1.0 if (dt_coeff is None) else dt_coeff dt = parameter self.dt = dt # Collect values from all MPI process if self.mpi_params.size == 1: self._collect_min = lambda e: e self._collect_max = lambda e: e else: comm = self.mpi_params.comm self._sendbuff = np.zeros((1,), dtype=dt.dtype) self._recvbuff = np.zeros((1,), dtype=dt.dtype) def _collect_max(val): self._sendbuff[0] = val comm.Allreduce(self._sendbuff, self._recvbuff, op=MPI.MAX) return self._recvbuff[0] def _collect_min(val): self._sendbuff[0] = val comm.Allreduce(self._sendbuff, self._recvbuff, op=MPI.MIN) return self._recvbuff[0] self._collect_max = _collect_max self._collect_min = _collect_min @op_apply def apply(self, **kwds): dt = self.compute_criteria(**kwds) dt *= self.dt_coeff dt = self._collect_max(np.maximum(dt, self.min_dt)) dt = self._collect_min(np.minimum(dt, self.max_dt)) assert dt > 0.0, "negative or zero timestep encountered." self.dt.set_value(dt)
[docs] @classmethod def supports_mpi(cls): return True
[docs] @abstractmethod def compute_criteria(cls, **kwds): pass
[docs] class ConstantTimestepCriteria(TimestepCriteria): @debug def __new__(cls, cst, parameter, Finf, name=None, pretty_name=None, **kwds): return super().__new__( cls, name=name, pretty_name=pretty_name, input_params=None, output_params=None, parameter=parameter, **kwds, ) @debug def __init__(self, cst, parameter, Finf, name=None, pretty_name=None, **kwds): """ Initialize a ConstantTimestepCriteria. Compute a timestep criteria for an arbitrary field F. dt = cst / Max_i(|Fi|inf) where i in [0, F.nb_components-1] Parameters ---------- cst: float or array-like of float Constraint constant (per component, same shape as Finf). Finf: ScalarParameter or TensorParameter A tensor parameter that contains |F|inf for every considered components. parameter: ScalarParameter The output parameter that will store the computed timestep. kwds: dict Base class arguments. """ if isinstance(cst, int): cst = float(cst) assert cst > 0.0, "negative cst factor." check_instance(cst, (float, np.ndarray, list, tuple)) check_instance(Finf, (ScalarParameter, TensorParameter)) if isinstance(Finf, ScalarParameter): assert isinstance(cst, float) is_scalar = True else: is_scalar = False if isinstance(cst, float): cst = np.full(shape=Finf.shape, dtype=Finf.dtype, fill_value=cst) if isinstance(cst, (list, tuple)): assert Finf.ndim == 1 cst = np.asarray(cst) msg = "Shape mismatch between parameter {} and cst {}." msg = msg.format(Finf.shape, cst.shape) assert Finf.shape == cst.shape, msg name = first_not_None(name, "CST") pretty_name = first_not_None(pretty_name, name) super().__init__( name=name, pretty_name=pretty_name, input_params={Finf.name: Finf}, output_params={parameter.name: parameter}, parameter=parameter, **kwds, ) self.cst = cst self.Finf = Finf self.is_scalar = is_scalar
[docs] def compute_criteria(self, **kwds): cst = self.cst Finf = self.Finf() if self.is_scalar: assert Finf >= 0 if Finf == 0: return np.inf else: return cst / Finf else: assert Finf.min() >= 0 mask = Finf != 0 dt = np.full_like(cst, fill_value=np.inf) dt[mask] = cst[mask] / Finf[mask] return dt.min()
[docs] class CflTimestepCriteria(TimestepCriteria): @debug def __new__( cls, cfl, parameter, Finf=None, Fmin=None, Fmax=None, dx=None, name=None, pretty_name=None, relative_velocities=None, **kwds, ): return super().__new__( cls, name=name, pretty_name=pretty_name, input_params=None, output_params=None, parameter=parameter, **kwds, ) @debug def __init__( self, cfl, parameter, Finf=None, Fmin=None, Fmax=None, dx=None, name=None, pretty_name=None, relative_velocities=None, **kwds, ): """ Initialize a CflTimestepCriteria. Compute dt = cfl * Min_i(dx[i] / F[i].inf) cfl = (Vinf/dx)*dt Parameters ---------- cfl: float CFL value used to compute timestep. Finf: TensorParameter A tensor parameter that contains Finf for every components. Can be specified instead of Fmin and Fmax: *Fmin will be set to -Finf *Fmax will be set to +Finf Fmin: TensorParameter A tensor parameter that contains Fmin for every components. Fmax: TensorParameter A tensor parameter that contains Fmax for every components. parameter: ScalarParameter The output parameter that will store the computed timestep. dx: tuple of float Space discretization, should be of size F.nb_components. If not given, will be extracted from Finf on setup. relative_velocities: array like of relative velocities, optional Specify relative velocities. kwds: dict Base class arguments. """ assert cfl > 0.0, "negative cfl condition." check_instance(cfl, float) if Finf is None: check_instance(Fmin, TensorParameter) check_instance(Fmax, TensorParameter) assert Fmin.shape == Fmax.shape input_params = {Fmin, Fmax} dtype = Fmin.dtype shape = Fmin.shape size = Fmin.size else: check_instance(Finf, TensorParameter) msg = "Cannot specify (Fmin,Fmax) and Finf at the same time." assert Fmin is None, msg assert Fmax is None, msg input_params = {Finf} dtype = Finf.dtype shape = Finf.shape size = Finf.size check_instance(parameter, ScalarParameter) check_instance(dx, tuple, values=float, allow_none=True) check_instance(relative_velocities, (tuple, list), allow_none=True) name = first_not_None(name, "CFL") pretty_name = first_not_None(pretty_name, name) super().__init__( name=name, pretty_name=pretty_name, input_params=input_params, output_params={parameter}, parameter=parameter, **kwds, ) if relative_velocities is None: relative_velocities = [(0,) * size] assert len(relative_velocities) >= 1 rv = () for Vr in relative_velocities: Vr = np.asarray(Vr, dtype=dtype) assert Vr.shape == shape rv += (Vr,) relative_velocities = rv self.cfl = cfl self.Fmin = Fmin self.Fmax = Fmax self.Finf = Finf self.dx = dx self.relative_velocities = relative_velocities
[docs] def setup(self, **kwds): super().setup(**kwds) if self.dx is None: dx = None for attr in ("Finf", "Fmin", "Fmax"): attr = getattr(self, attr) if hasattr(attr, "min_max_dfield"): dx = attr.min_max_dfield.get_unique_attribute( "mesh", "_mesh", "_space_step" )[::-1] if dx is None: msg = "Could not extract dx from Fmin." raise RuntimeError(msg) self.dx = dx
[docs] def compute_criteria(self, **kwds): cfl = self.cfl dx = self.dx if self.Finf is None: Fmin = self.Fmin() Fmax = self.Fmax() else: Finf = self.Finf() Fmin = -Finf Fmax = +Finf assert len(dx) == Fmin.size == Fmax.size assert len(self.relative_velocities) >= 1 dt = np.inf for Vr in self.relative_velocities: Vmin = Fmin - Vr Vmax = Fmax - Vr Vinf = np.maximum(np.abs(Vmin), np.abs(Vmax)) if np.all(np.divide(Vinf, dx) == 0): cdt = cfl * np.inf else: cdt = cfl / np.max(np.divide(Vinf, dx)) dt = min(dt, cdt) return dt
[docs] def compute_cfl(self, dt): mdt = self.compute_criteria() return (dt / mdt) * self.cfl
[docs] @classmethod def supports_mpi(cls): return True
[docs] class AdvectionTimestepCriteria(TimestepCriteria): @debug def __new__( cls, lcfl, parameter, criteria, Finf=None, gradFinf=None, name=None, pretty_name=None, **kwds, ): return super().__new__( cls, name=name, pretty_name=pretty_name, input_params=None, output_params=None, parameter=parameter, **kwds, ) @debug def __init__( self, lcfl, parameter, criteria, Finf=None, gradFinf=None, name=None, pretty_name=None, **kwds, ): """ Initialize a AdvectionTimestepCriteria. Compute a timestep criteria for advection: W_INF: dt = lcfl / Max_i(|Wi|) GRAD_U: dt = lcfl / Max_i(|dUi/dXi|) DEFORMATION: dt = lcfl / Max_i( Sum_j(|Sij|) ) where Sij is the deformation tensor Sij = (dUi/dXj + dUj/dXi)/2 where |dUi/dXj| = |gradF|_inf |Wi| = |F|_inf Parameters ---------- lcfl: float Lagrangian CFL constant. Finf: TensorParameter, optional A tensor parameter that contains |W|inf for every components. of the vorticity. gradFinf: TensorParameter A tensor parameter that contains |gradF|inf for every components in every directions, ie. the inf. norm of the gradient of velocity. parameter: ScalarParameter The output parameter that will store the computed timestep. criteria: AdvectionCriteria The chosen criteria. Currently only W_INF, GRAD_U, and DEFORMATION is supported. kwds: dict Base class arguments. """ assert lcfl > 0.0, "negative LCFL factor." check_instance(lcfl, float) check_instance(Finf, TensorParameter, allow_none=True) check_instance(gradFinf, TensorParameter, allow_none=True) check_instance(parameter, ScalarParameter) check_instance(criteria, AdvectionCriteria) input_params = set() if Finf is not None: assert Finf().ndim == 1, "Finf should be a 1D tensor parameter." input_params.update({Finf}) if gradFinf is not None: assert gradFinf().ndim == 2, "gradFinf should be a 2D tensor parameter." input_params.update({gradFinf}) name = first_not_None(name, "LCFL") pretty_name = first_not_None(pretty_name, name) super().__init__( name=name, pretty_name=pretty_name, input_params=input_params, output_params={parameter}, parameter=parameter, **kwds, ) self.lcfl = lcfl self.Finf = Finf self.gradFinf = gradFinf self.criteria = criteria
[docs] def compute_criteria(self, **kwds): criteria = self.criteria lcfl = self.lcfl if criteria is AdvectionCriteria.W_INF: Finf = self.Finf() if np.max(Finf) == 0: return lcfl * np.inf else: return lcfl / np.max(Finf) elif criteria is AdvectionCriteria.GRAD_U: gradFinf = self.gradFinf() if gradFinf.ndim == 2: gradFinf = np.diag(gradFinf) # extract diagonal return lcfl / np.max(gradFinf) elif criteria is AdvectionCriteria.DEFORMATION: gradFinf = self.gradFinf() gradFinf = (gradFinf + gradFinf.T) / 2.0 return lcfl / np.max(gradFinf.sum(axis=0)) else: msg = f"Unsupported stretching criteria {criteria}." raise RuntimeError(msg)
[docs] class StretchingTimestepCriteria(TimestepCriteria): @debug def __new__( cls, gradFinf, parameter, cst=1.0, criteria=StretchingCriteria.GRAD_U, name=None, pretty_name=None, **kwds, ): return super().__new__( cls, name=name, pretty_name=pretty_name, input_params=None, output_params=None, parameter=parameter, **kwds, ) @debug def __init__( self, gradFinf, parameter, cst=1.0, criteria=StretchingCriteria.GRAD_U, name=None, pretty_name=None, **kwds, ): """ Initialize a StretchingTimestepCriteria. Compute dt = cst / Max_i( Sum_j( |dFi/dXj| ) ) where |dFi/dXj| = |gradF|_inf_ij ie. dt = cst / np.max( gradFinf.sum(axis=1) ) Parameters ---------- cst: float Factor used to compute timestep. gradFinf: TensorParameter A tensor parameter that contains |gradF|inf for every components in every directions. parameter: ScalarParameter The output parameter that will store the computed timestep. criteria: StretchingCriteria The chosen criteria. Currently only StretchingCriteria.GRAD_U is supported. kwds: dict Base class arguments. """ assert cst > 0.0, "negative constant factor." check_instance(cst, float) check_instance(gradFinf, TensorParameter) check_instance(parameter, ScalarParameter) check_instance(criteria, StretchingCriteria) assert gradFinf().ndim == 2, "gradFinf should be a 2D tensor." name = first_not_None(name, "STRETCH") pretty_name = first_not_None(pretty_name, name) super().__init__( name=name, pretty_name=pretty_name, input_params={gradFinf}, output_params={parameter}, parameter=parameter, **kwds, ) self.cst = cst self.gradFinf = gradFinf self.criteria = criteria
[docs] def compute_criteria(self, **kwds): criteria = self.criteria if criteria is StretchingCriteria.GRAD_U: gradFinf = self.gradFinf() return self.cst / np.max(gradFinf.sum(axis=1)) else: msg = f"Unsupported stretching criteria {criteria}." raise RuntimeError(msg)
[docs] class MergeTimeStepCriterias(TimestepCriteria): @debug def __new__( cls, parameter, criterias, equivalent_CFL=None, cfl_criteria=None, start_time=None, **kwds, ): return super().__new__( cls, input_params=None, output_params=None, parameter=parameter, **kwds ) @debug def __init__( self, parameter, criterias, equivalent_CFL=None, cfl_criteria=None, start_time=None, **kwds, ): check_instance(parameter, ScalarParameter) check_instance(criterias, dict, keys=str, values=TimestepCriteria) check_instance(equivalent_CFL, ScalarParameter, allow_none=True) output_params = {parameter} input_params = set() for criteria in criterias.values(): input_params.update(criteria.output_params) assert not ((equivalent_CFL is not None) ^ (cfl_criteria is not None)) self.equivalent_CFL = equivalent_CFL self.cfl_criteria = cfl_criteria self._start_time = start_time super().__init__( parameter=parameter, input_params=input_params, output_params=output_params, **kwds, )
[docs] def compute_criteria(self, **kwds): dt = min(p.value for p in self.input_params.keys()) if self.equivalent_CFL is not None: cfl = self.cfl_criteria.compute_cfl(dt) self.equivalent_CFL.set_value(cfl) return dt
[docs] @debug def apply(self, simulation, **kwds): assert ( simulation.dt is self.dt ), "Parameter mismatch between Simulation and AdaptiveTimeStep." if (self._start_time is None) or (simulation.t() > self._start_time): super().apply(simulation=simulation, **kwds)
[docs] class AdaptiveTimeStep(ComputationalGraphNodeGenerator): """ The adaptative Time Step is computed according to the following expression : dt = max(min(max_dt, dt_criterias), min_dt) """ @debug def __new__( cls, dt, min_dt=None, max_dt=None, dt_coeff=None, equivalent_CFL=False, base_kwds=None, start_time=None, **kwds, ): base_kwds = first_not_None(base_kwds, {}) return super().__new__( cls, candidate_input_tensors=None, candidate_output_tensors=None, **base_kwds, ) @debug def __init__( self, dt, min_dt=None, max_dt=None, dt_coeff=None, equivalent_CFL=False, base_kwds=None, start_time=None, **kwds, ): """ Initialize an AdaptiveTimeStep operator. Parameters ---------- dt: hysop.parameter.scalar_parameter.ScalarParameter The parameter that will be updated. min_dt : double, optional Minimum value allowed for time step. max_dt : double, optional Maximum value allowed for time step. dt_coeff: double, optional Constant coefficient applied to resulting dt after resolving min and max values. start_time: double, optional Simulation time when starting to adapt timestep param_name: str, optional Output dt parameter name (default is 'dt'). base_kwds: dict Base kwds of this class. kwds : Additional keywords arguments that will be passed to MergeTimeStepCriterias. Notes ----- * This operator has no effect on input variables. """ base_kwds = first_not_None(base_kwds, {}) super().__init__( candidate_input_tensors=(), candidate_output_tensors=(), **base_kwds ) # tuple of criterias used to compute dt self.criterias = {} self.merge_kwds = { "min_dt": min_dt, "max_dt": max_dt, "dt_coeff": dt_coeff, "start_time": start_time, } self.merge_kwds.update(**kwds) self.equivalent_CFL = None self.cfl_criteria = None self.parameter = dt
[docs] def push_cst_criteria( self, cst, Finf=None, name=None, pretty_name=None, param_name=None, param_pretty_name=None, parameter=None, quiet=False, dtype=None, **kwds, ): parameter = self._build_parameter( parameter=parameter, quiet=quiet, name=param_name, pretty_name=param_pretty_name, basename=name.replace("dt_", ""), dtype=dtype, ) criteria = ConstantTimestepCriteria( cst=cst, Finf=Finf, parameter=parameter, name=name, pretty_name=pretty_name, **kwds, ) self._push_criteria(parameter.name, criteria)
[docs] def push_cfl_criteria( self, cfl, Fmin=None, Fmax=None, Finf=None, dx=None, name=None, pretty_name=None, param_name=None, param_pretty_name=None, parameter=None, quiet=False, relative_velocities=None, equivalent_CFL=None, dtype=None, **kwds, ): """ See hysop.operator.adapt_timpestep.CflTimestepCriteria. """ parameter = self._build_parameter( parameter=parameter, quiet=quiet, name=param_name, pretty_name=param_pretty_name, basename="cfl", dtype=dtype, ) criteria = CflTimestepCriteria( cfl=cfl, Fmin=Fmin, Fmax=Fmax, Finf=Finf, dx=dx, parameter=parameter, name=name, pretty_name=pretty_name, relative_velocities=relative_velocities, **kwds, ) self._push_criteria(parameter.name, criteria) if isinstance(equivalent_CFL, ScalarParameter): cfl_criteria = criteria elif equivalent_CFL is True: equivalent_CFL = ScalarParameter(name="CFL*") cfl_criteria = criteria else: equivalent_CFL = None cfl_criteria = None self.equivalent_CFL = equivalent_CFL self.cfl_criteria = cfl_criteria return criteria.dt
[docs] def push_advection_criteria( self, lcfl, criteria, Finf=None, gradFinf=None, name=None, pretty_name=None, param_name=None, param_pretty_name=None, parameter=None, quiet=False, dtype=None, **kwds, ): """ See hysop.operator.adapt_timpestep.AdvectionTimestepCriteria. """ parameter = self._build_parameter( parameter=parameter, quiet=quiet, dtype=dtype, name=param_name, pretty_name=param_pretty_name, basename=f"lcfl_{str(criteria).lower()}", ) criteria = AdvectionTimestepCriteria( lcfl=lcfl, Finf=Finf, gradFinf=gradFinf, parameter=parameter, criteria=criteria, name=name, pretty_name=pretty_name, **kwds, ) self._push_criteria(parameter.name, criteria) return criteria.dt
[docs] def push_lcfl_criteria(self, *args, **kwds): """ See hysop.operator.adapt_timpestep.AdvectionTimestepCriteria. """ return self.push_advection_criteria(*args, **kwds)
[docs] def push_stretching_criteria( self, criteria, gradFinf, cst=1.0, name=None, pretty_name=None, parameter=None, quiet=False, dtype=None, **kwds, ): """ See hysop.operator.adapt_timpestep.StretchingTimestepCriteria. """ parameter = self._build_parameter( parameter=parameter, quiet=quiet, name=name, pretty_name=pretty_name, basename="stretch", dtype=dtype, ) criteria = StretchingTimestepCriteria( cst=cst, parameter=parameter, gradFinf=gradFinf, criteria=criteria, **kwds ) self._push_criteria(parameter.name, criteria) return criteria.dt
def _build_parameter( self, name=None, pretty_name=None, quiet=None, basename=None, parameter=None, dtype=None, ): if parameter is None: name = first_not_None(name, f"dt_{basename}") pretty_name = first_not_None(pretty_name, name) dtype = first_not_None(dtype, HYSOP_REAL) parameter = ScalarParameter( name=name, pretty_name=pretty_name, quiet=quiet, dtype=dtype ) return parameter def _push_criteria(self, name, criteria): assert not self.generated, "Node was already generated." if name in self.criterias: msg = f"Criteria {name} was already registered." raise RuntimeError(msg) self.criterias[name] = criteria def _generate(self): assert not self.generated, "Node was already generated." if not self.criterias: msg = "No timestep criterias were ever setup." raise RuntimeError(msg) # we need to compute the minimum timestep of all criterias # through a new operator. self.merge_kwds.setdefault("name", "DT") merge = MergeTimeStepCriterias( parameter=self.parameter, equivalent_CFL=self.equivalent_CFL, cfl_criteria=self.cfl_criteria, criterias=self.criterias, **self.merge_kwds, ) operators = list(self.criterias.values()) operators.append(merge) return operators